iT邦幫忙

第 11 屆 iThome 鐵人賽

DAY 21
0
Modern Web

Angular新手村學習筆記(2019)系列 第 21

Day21_RxJS基本介紹-Subject

  • 分享至 

  • xImage
  •  

YA,倒數囉,因為我是最後一天開賽,看已經陸續有人完賽(解脫)好羨慕呀。
真想趕快看別人的文章~

這一樣一樣是Kevin大大講解

[S05E04]RxJS 基本介紹 - Subject
https://www.youtube.com/watch?v=9udVLO947kk&list=PL9LUW6O9WZqgUMHwDsKQf3prtqVvjGZ6S&index=19

Subject的範例

 * Access the event object with the `$event` argument passed to the output event
 * handler:
export class EventEmitter<T extends any> extends Subject<T> {

 /**
  * Emits an event containing a given value.
  * @param value The value to emit.
 */
 emit(value?: T) { super.next(value); } // emit就是作Subject的next()
  • multi cast

subject 文件導讀

https://rxjs-dev.firebaseapp.com/guide/subject
https://cn.rx.js.org/manual/overview.html#subject-

  • subject是observable
    可以被subscribe
  • subject也是observer
    subject有observer的特性:next(v),error(e),complete()

subject有分4種

使用情境及行為都不太一樣

  1. Multicasted Observables : Reference counting
  2. BehaviorSubject
  3. ReplaySubject
  4. AsyncSubject
    前3種較常用

簡單的說,看JerryHong大大的這2篇就夠了:

  1. 普通Subject
var a1=new Rx.Subject();
a1.subscribe(console.log);
a1.next(1) // 1
a1.next(2) // 2
a1.complete(); // 完成後就要不到東西啦
a1.next(1); // 沒東西
  1. Multicasted Observables(比較難,建議看文件)
    Subjects 是將任意 Observable 執行共享給多個observer的唯一方式

利用Subject去註冊一個Observable,其他人再去subscribe這個Subject
Multicasted 的行為
Reference counting

<input type="text" #postId />
<button (click)="queryComment(postId.value)">get Comment</button>

{{ data$ | async | json }} async寫一次就subscribe一次
{{ data$ | async | json }} 
{{ data$ | async | json }} 
如果寫3次,怎麼只處理1次,其他subscribe取得資料又正確?
(每次subscribe都會建立stram(資料流),如何讓後面的subscribe共用同1份資料?)
export class AppComponent {
    // 1. 當<button (click)="queryComment(postId.value)">get Comment</button>
    queryComment(id) {
        this.query$.next(id); 
    }
    
    // 2. 第1次subject傳入的值是1,之後才是postId.value
    query$ = new BehaviorSubject(1);
    // 3. 用pipe串接,用mergeMap回傳
    data$ = this.getPostComment(1);

    // 4. http.get
    // {{ data$ | async | json }} 寫3次就會subscribe 3次,建立3條stream
    getPostComment(id) {
        return this.http.get('https://jsonplaceholder.typicode.com/posts/$(id)')
                        .pipe(shareReplay());
    );
    
    constructor(private http: HttpClient){}

範例2:

獨立資料流的寫法

var source = Rx.Observable.interval(1000).take(5);
// 分別下2個subscribe
source.subscribe(d => console.log('A:'+d);
source.subscribe(d => console.log('B:'+d);
// 這2個subscribe是不能分享資料的(各自獨立)

有共用的寫法

var source = Rx.Observable.interval(1000).take(10);

// 利用Subject去註冊一個Observable,其他人再去subscribe這個Subject
// Multicasted 的行為
var sub = new Rx.Subject(); // 先建一個Subject
// 定義第1個subscribe
sub.subscribe(d => console.log('A:'+d);
// 開始跑第1個sub
source.subscribe(sub);

// 定義第2個subscribe
sub.subscribe(d => console.log('B:'+d); // 此時B不會從0開始跑(會跟A一起)

// 此時的source.subscribe(sub);
         ^^^^^共享的資料流 ^^ 有2個就會跑2個
  1. BehaviorSubject

Subject 的其中一个变体就是 BehaviorSubject,它有一个“当前值”的概念。它保存了发送给消费者的最新值。并且当有新的观察者订阅时,会立即从 BehaviorSubject 那接收到“当前值”。

都會留下 最新的值(多個observer在訂閱的情況下)
瓽有新的observer做subscribe時,立刻從BehaviorSubject接收 最新的的值

使用情境:
BehaviorSubject適用用來表示「隨著時間會變動的值」,例如:生日 為 Subject,年紀 為 BehaviorSubject

var b1 = new Rx.BehaviorSubject('defaultValue');
                ^^^^^^^^^^^^^^^
b1.subscribe(console.log); // defaultValue
b1.next('second value');
b1.subscribe(console.log); // second value 會保留最新的
// BehaviorSubject有額外的method可以取當下的值.getValue()
b1.getValue(); // 等於 b1.value
  • asObservable() // Subject都有
    表示回傳的型別是Observable,代表外面使用他時,不能用.next()傳值
b1.asObservable();
  • 餵Rx.BehaviorSubject吃Promise()
var p = new Promise((resolve, reject)=>{
    setTimeout(()=>{
        console.log('a');
    },100);
});
b1 = new Rx.BehaviorSubject(p);
b1.value; // Promise {<resolved>: "a"} // 感覺可以餵,但結果怪怪的
b1.value.then(console.log); // a
                            // Promise {<resolved>: undefined}
  • 應用範例
<input type="text" #postId />
<button (click)="queryComment(postId.value)">get Comment</button>

{{ data$ | async | json }} 好像是這邊的async做subscribe的動作?
                           async寫一次就subscribe一次
import { BehaviorSubject } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
...
export class AppComponent {
    // 1. 當<button (click)="queryComment(postId.value)">
    //              get Comment
    //      </button>
    queryComment(id) {
        this.query$.next(id); 
    }
    
    // 付責傳接值
    // 2. 第1次subject傳入的值是1,之後才是postId.value
    query$ = new BehaviorSubject(1);
    // 3. 用pipe串接,用mergeMap回傳
    // mergeMap 請參考 https://ithelp.ithome.com.tw/articles/10188387
                     // vvvv pipe後面都是Observable,而非Subject
    data$ = this.query$.pipe( // mergeMap回傳Observable
    //^^^ data是Observable(因為沒有subscribe)
    
        // 資料取回後並回傳Oberservable
        mergeMap(id => this.getPostComment(id)) 
      //^^^ map加上mergeAll  ^^^^^^^^^^^^^^  
      //                    可以並行處理多個 observable,並把回傳重疊
        
// 如果用switchMap,怕http get同事回來時,資料打架,會只保留最後一份資料
// 例如:使用者狂點下載,後端api還是會request多次,是回傳回來後前端只保留最後1一次
        
// 只要RxJS的operator能吃 Observable,就能吃Promise
// 例如: mergeMap(id => fetch('...')),
//                      ^^^^ 回傳promise
    );

    // 4. http.get
    getPostComment(id) {
        // httpClient的get是Observable
        return this.http.get('https://jsonplaceholder.typicode.com/posts/$(id)'
    );
    
    constructor(private http: HttpClient){}
  1. ReplaySubject
    可保留設定的筆數(保留最後n筆資料)
var r = new Rx.ReplaySubject(3);
r.next(1);
r.next(2);
r.next(3);
r.next(4);
r.subscribe(console.log); // 同時間取得2,3,4(最後3筆)
  1. AsyncSubject
    只有當Observable complete()時,才會將最後1個值發送給observer
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});

// next(1)~next(4)不會console.log任何值出來
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});

subject.next(5);
// 當下complete()時,才會把最後一次next(5)的值吐出來
subject.complete();

// Logs:
// observerA: 5
// observerB: 5

Subject其他使用情境

管理多個Observable的時候

import { BehaviorSubject, Subject } from 'rxjs';
import { mergeMap, map, share, shareReplay, takeUntil } from 'rxjs/operators';
                                            ^^^^^^^^^
export class AppComponent{
    destory$ = new Subject();
    
    getPostComment(id){
        return this.http.get('https://jsonplaceholder.typicode.com/posts/$(id)')
                        .pipe(
                            takeUntil(this.destroy$),
                                           ^^^^^^^^ 取到destroy$有值就不再取了
                            shareReplay()
                            );
    }
    
    ngOnDestroy(){
        this.destroy$.next();
        this.destroy$.complete(); 
        // 這樣就能把所有不會停的Observable一起停掉
        // 不用管理subscription
    }

上一篇
Day20_JS Array 基本介紹(AngularTaiwan線上讀書會第5季-主題:RxJS)
下一篇
Day22_Observable & RxJS in Angular
系列文
Angular新手村學習筆記(2019)33
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言